unicorn_bybit_websocket_api package¶
Submodules¶
unicorn_bybit_websocket_api.connection module¶
unicorn_bybit_websocket_api.exceptions module¶
- exception unicorn_bybit_websocket_api.exceptions.MaximumSubscriptionsExceeded(exchange: str = None, max_subscriptions_per_stream: int = None)[source]¶
Bases:
Exception
Exception if the maximum number of subscriptions per stream has been exceeded!
- exception unicorn_bybit_websocket_api.exceptions.Socks5ProxyConnectionError[source]¶
Bases:
Exception
Exception if the manager class is not able to establish a connection to the socks5 proxy.
- exception unicorn_bybit_websocket_api.exceptions.StreamIsCrashing(stream_id=None, reason=None)[source]¶
Bases:
Exception
Exception if the stream is crashing.
- exception unicorn_bybit_websocket_api.exceptions.StreamIsRestarting(stream_id=None, reason=None)[source]¶
Bases:
Exception
Exception if the stream is restarting.
unicorn_bybit_websocket_api.manager module¶
- class unicorn_bybit_websocket_api.manager.BybitWebSocketApiManager(process_stream_data: Callable | None = None, process_stream_data_async: Callable | None = None, process_asyncio_queue: Callable | None = None, exchange: str = 'bybit.com', warn_on_update: bool = True, restart_timeout: int = 6, show_secrets_in_logs: bool = False, output_default: Literal['dict', 'raw_data'] | None = 'raw_data', enable_stream_signal_buffer: bool = False, disable_colorama: bool = False, stream_buffer_maxlen: int | None = None, process_stream_signals=None, close_timeout_default: int = 1, ping_interval_default: int = 5, ping_timeout_default: int = 10, high_performance: bool = False, debug: bool = False, restful_base_uri: str = None, websocket_base_uri: str = None, max_subscriptions_per_stream_spot: int | None = None, max_subscriptions_per_stream_linear: int | None = None, max_subscriptions_per_stream_inverse: int | None = None, max_subscriptions_per_stream_option: int | None = None, socks5_proxy_server: str = None, socks5_proxy_user: str = None, socks5_proxy_pass: str = None, socks5_proxy_ssl_verification: bool = True, auto_data_cleanup_stopped_streams: bool = False, lucit_api_secret: str = None, lucit_license_ini: str = None, lucit_license_profile: str = None, lucit_license_token: str = None)[source]¶
Bases:
Thread
A Python SDK by LUCIT to use the Bybit Websocket API`s in a simple, fast, flexible, robust and fully-featured way.
Bybit.com API documentation:
- Parameters:
process_asyncio_queue (Optional[Callable]) – Insert your Asyncio function into the same AsyncIO loop in which the websocket data is received. This method guarantees the fastest possible asynchronous processing of the data in the correct receiving sequence. https://unicorn-bybit-websocket-api.docs.lucit.tech/readme.html#or-await-the-webstream-data-in-an-asyncio-coroutine
process_stream_data (Optional[Callable]) – Provide a function/method to process the received webstream data (callback). The function will be called instead of add_to_stream_buffer() like process_stream_data(stream_data, stream_buffer_name) where stream_data contains the raw_stream_data. If not provided, the raw stream_data will get stored in the stream_buffer or provided to a specific callback function of create_stream()! How to read from stream_buffer!
process_stream_data_async (Optional[Callable]) – Provide an asyncio function/method to process the received webstream data (callback). The function will be called instead of add_to_stream_buffer() like process_stream_data(stream_data, stream_buffer_name) where stream_data contains the raw_stream_data. If not provided, the raw stream_data will get stored in the stream_buffer or provided to a specific callback function of create_stream()! How to read from stream_buffer!
exchange (str) – Select bybit.com, bybit.com-testnet (default: bybit.com)
warn_on_update (bool) – set to False to disable the update warning of UBBWA and also in UBBRA used as submodule.
restart_timeout (int) – A stream restart must be successful within this time, otherwise a new restart will be initialized. Default is 6 seconds.
show_secrets_in_logs (bool) – set to True to show secrets like listen_key, api_key or api_secret in log file (default=False)
output_default (str) – set to “dict” to convert the received raw data to a python dict - otherwise with the default setting “raw_data” the output remains unchanged and gets delivered as received from the endpoints. Change this for a specific stream with the output parameter of create_stream() and replace_stream()
enable_stream_signal_buffer (bool) – set to True to enable the stream_signal_buffer and receive information about disconnects and reconnects to manage a restore of the lost data during the interruption or to recognize your bot got blind.
disable_colorama (bool) – set to True to disable the use of colorama
stream_buffer_maxlen (int or None) – Set a max len for the generic stream_buffer. This parameter can also be used within create_stream() for a specific stream_buffer.
process_stream_signals (function) – Provide a function/method to process the received stream signals. The function is running inside an asyncio loop and will be called instead of add_to_stream_signal_buffer() like process_stream_data(signal_type=False, stream_id=False, data_record=False).
auto_data_cleanup_stopped_streams (bool) – The parameter “auto_data_cleanup_stopped_streams=True” can be used to inform the UBBWA instance that all remaining data of a stopped stream should be automatically and completely deleted.
close_timeout_default (int) – The close_timeout parameter defines a maximum wait time in seconds for completing the closing handshake and terminating the TCP connection. This parameter is passed through to the websockets.client.connect()
ping_interval_default (int) – Once the connection is open, a Ping frame is sent every ping_interval seconds. This serves as a keepalive. It helps keeping the connection open, especially in the presence of proxies with short timeouts on inactive connections. Set ping_interval to None to disable this behavior. This parameter is passed through to the websockets.client.connect()
ping_timeout_default (int) – If the corresponding Pong frame isn’t received within ping_timeout seconds, the connection is considered unusable and is closed with code 1011. This ensures that the remote endpoint remains responsive. Set ping_timeout to None to disable this behavior. This parameter is passed through to the websockets.client.connect()
high_performance (bool) – Set to True makes create_stream() a non-blocking function
debug (bool) – If True the lib adds additional information to logging outputs
restful_base_uri (str) – Override restful_base_uri. Example: https://127.0.0.1
websocket_base_uri (str) – Override websocket_base_uri. Example: ws://127.0.0.1:8765/
max_subscriptions_per_stream_spot (int) – Override the max_subscriptions_per_stream_spot value. Example: 1024
socks5_proxy_server (str) – Set this to activate the usage of a socks5 proxy. Example: ‘127.0.0.1:9050’
socks5_proxy_user (str) – Set this to activate the usage of a socks5 proxy user. Example: ‘alice’
socks5_proxy_pass (str) – Set this to activate the usage of a socks5 proxy password.
socks5_proxy_ssl_verification (bool) – Set to False to disable SSL server verification. Default is True.
lucit_api_secret (str) – The api_secret of your UNICORN Bybit Suite license from https://shop.lucit.services/software/unicorn-trading-suite
lucit_license_ini (str) – Specify the path including filename to the config file (ex: ~/license_a.ini). If not provided lucitlicmgr tries to load a lucit_license.ini from /home/oliver/.lucit/.
lucit_license_profile (str) – The license profile to use. Default is ‘LUCIT’.
lucit_license_token (str) – The license_token of your UNICORN Bybit Suite license from https://shop.lucit.services/software/unicorn-trading-suite
- add_payload_to_stream(stream_id=None, payload: dict = None)[source]¶
Add a payload to a stream by stream_id.
- Parameters:
stream_id (str) – id of a stream
payload (dict) – The payload in JSON to add.
- Returns:
bool
- add_to_ringbuffer_error(error)[source]¶
Add received error messages from websocket endpoints to the error ringbuffer
- Parameters:
error (string) – The data to add.
- Returns:
bool
- add_to_ringbuffer_result(result)[source]¶
Add received result messages from websocket endpoints to the result ringbuffer
- Parameters:
result (string) – The data to add.
- Returns:
bool
- add_to_stream_buffer(stream_data, stream_buffer_name: Literal[False] | str = False)[source]¶
Kick back data to the stream_buffer
If it is not possible to process received stream data (for example, the database is restarting, so it’s not possible to save the data), you can return the data back into the stream_buffer. After a few seconds you stopped writing data back to the stream_buffer, the BybitWebSocketApiManager starts flushing back the data to normal processing.
- Parameters:
stream_data (raw stream_data or unicorn_fied stream data) – the data you want to write back to the buffer
stream_buffer_name (False or str) – If False the data is going to get written to the default stream_buffer, set to True to read the data via pop_stream_data_from_stream_buffer(stream_id) or provide a string to create and use a shared stream_buffer and read it via pop_stream_data_from_stream_buffer(‘string’).
- Returns:
bool
- add_to_stream_signal_buffer(signal_type=None, stream_id=None, data_record=None, error_msg=None)[source]¶
Add signals about a stream to the stream_signal_buffer
- Parameters:
signal_type (str) – “CONNECT”, “DISCONNECT”, “FIRST_RECEIVED_DATA” or “STREAM_UNREPAIRABLE”
stream_id (str) – id of a stream
data_record (str or dict) – The last or first received data record
error_msg (str or dict) – The message of the error.
- Returns:
bool
- add_total_received_bytes(size)[source]¶
Add received bytes to the total received bytes statistic
- Parameters:
size (int) – int value of added bytes
- asyncio_queue_task_done(stream_id=None) bool [source]¶
If get_stream_data_from_asyncio_queue() was used, asyncio_queue_task_done() must be executed at the end of the loop.
- Parameters:
stream_id (str) – provide a stream_id - only needed for userData Streams (acquiring a listenKey)
- Returns:
bool
Example:
. code-block:: python
- while True:
data = await bybit_wsm.get_stream_data_from_asyncio_queue(stream_id) print(data) bybit_wsm.asyncio_queue_task_done(stream_id)
- clear_asyncio_queue(stream_id: str = None) bool [source]¶
Clear the asyncio queue of a specific stream.
- Parameters:
stream_id (str) – provide a stream_id
- Returns:
bool
- clear_stream_buffer(stream_buffer_name: Literal[False] | str = False)[source]¶
Clear the stream_buffer
- Parameters:
stream_buffer_name (False or str) – False to read from generic stream_buffer, the stream_id if you used True in create_stream() or the string name of a shared stream_buffer.
- Returns:
bool
- create_payload(stream_id, method, channels=None, markets=None)[source]¶
Create the payload for subscriptions
- Parameters:
stream_id (str) – provide a stream_id
method (str) – SUBSCRIBE or UNSUBSCRIBE
channels (str, list, set) – provide the channels to create the URI
markets (str, list, set) – provide the markets to create the URI
- Returns:
payload (list) or False
- create_stream(channels: str | List[str] | Set[str] | None = None, endpoint: str = None, markets: str | List[str] | Set[str] | None = None, stream_label: str = None, stream_buffer_name: Literal[False] | str = False, api_key: str = None, api_secret: str = None, output: Literal['dict', 'raw_data'] | None = None, ping_interval: int = None, ping_timeout: int = None, close_timeout: int = None, stream_buffer_maxlen: int = None, process_stream_data: Callable | None = None, process_stream_data_async: Callable | None = None, process_asyncio_queue: Callable | None = None)[source]¶
Create a websocket stream
If you provide 2 markets and 2 channels, then you are going to create 4 subscriptions (markets * channels).
Example:
channels = [‘trade’, ‘kline_1’]
markets = [‘bnbbtc’, ‘ethbtc’]
Finally: bnbbtc@trade, ethbtc@trade, bnbbtc@kline.1, ethbtc@kline.1
There is a subscriptions limit per stream!
- Parameters:
channels (str, list, set) – provide the channels you wish to stream
endpoint (str) – provide the endpoint
markets (str, list, set) – provide the markets you wish to stream
stream_label (str) – provide a stream_label to identify the stream
stream_buffer_name (bool or str) – If False the data is going to get written to the default stream_buffer, set to True to read the data via pop_stream_data_from_stream_buffer(stream_id) or provide a string to create and use a shared stream_buffer and read it via pop_stream_data_from_stream_buffer(‘string’).
api_key (str) – provide a valid Bybit API key
api_secret (str) – provide a valid Bybit API secret
output (str) – the default setting raw_data can be globally overwritten with the parameter output_default of BybitWebSocketApiManager`. To overrule the output_default value for this specific stream, set output to “dict” to convert the received raw data to a python dict - otherwise with the default setting “raw_data” the output remains unchanged and gets delivered as received from the endpoints
ping_interval (int or None) – Once the connection is open, a Ping frame is sent every ping_interval seconds. This serves as a keepalive. It helps keeping the connection open, especially in the presence of proxies with short timeouts on inactive connections. Set ping_interval to None to disable this behavior. (default: 20) This parameter is passed through to the websockets.client.connect()
ping_timeout (int or None) – If the corresponding Pong frame isn’t received within ping_timeout seconds, the connection is considered unusable and is closed with code 1011. This ensures that the remote endpoint remains responsive. Set ping_timeout to None to disable this behavior. (default: 20) This parameter is passed through to the websockets.client.connect()
close_timeout (int or None) – The close_timeout parameter defines a maximum wait time in seconds for completing the closing handshake and terminating the TCP connection. (default: 10) This parameter is passed through to the websockets.client.connect()
stream_buffer_maxlen (int or None) – Set a max len for the stream_buffer. Only used in combination with a non-generic stream_buffer. The generic stream_buffer uses always the value of BybitWebSocketApiManager().
process_stream_data (function) – Provide a function/method to process the received webstream data (callback). The function will be called instead of add_to_stream_buffer() like process_stream_data(stream_data) where stream_data contains the raw_stream_data. If not provided, the raw stream_data will get stored in the stream_buffer or provided to the global callback function provided during object instantiation! How to read from stream_buffer!
process_stream_data_async (function) – Provide an asynchronous function/method to process the received webstream data (callback). The function will be called instead of add_to_stream_buffer() like process_stream_data(stream_data) where stream_data contains the raw_stream_data. If not provided, the raw stream_data will get stored in the stream_buffer or provided to the global callback function provided during object instantiation! How to read from stream_buffer!
process_asyncio_queue (Optional[Callable]) – Insert your Asyncio function into the same AsyncIO loop in which the websocket data is received. This method guarantees the fastest possible asynchronous processing of the data in the correct receiving sequence. https://unicorn-bybit-websocket-api.docs.lucit.tech/readme.html#or-await-the-webstream-data-in-an-asyncio-coroutine
- Returns:
stream_id or ‘None’
- create_websocket_uri(channels=None, endpoint=None, markets=None, stream_id=None) str [source]¶
Create a websocket URI
- Parameters:
channels (str, list, set) – provide the channels to create the URI
endpoint (str) – provide the endpoint to create the URI
markets (str, list, set) – provide the markets to create the URI
stream_id (str) – provide a stream_id - only needed for userData Streams (acquiring a listenKey)
- Returns:
str or None
- delete_listen_key_by_stream_id(stream_id) bool [source]¶
Delete a Bybit listen_key from a specific !userData stream
- Parameters:
stream_id (str) – id of a !userData stream
- Returns:
bool
- delete_stream_from_stream_list(stream_id, timeout: float = 10.0) bool [source]¶
Delete a stream from the stream_list
Even if a stream crashes or get stopped, its data remains in the BybitWebSocketApiManager till you stop the BybitWebSocketApiManager itself. If you want to tidy up the stream_list you can use this method.
- Parameters:
stream_id (str) – id of a stream
timeout (float) – The timeout for how long to wait for the stream to stop. The function aborts if the waiting time is exceeded and returns False.
- Returns:
bool
- static fill_up_space_centered(demand_of_chars, string, filling=' ')[source]¶
Add whitespaces to string to a length of demand_of_chars
- Parameters:
demand_of_chars (int) – how much chars does the string have to have?
string (str) – the string that has to get filled up with spaces
filling (str) – filling char (default: blank space)
- Returns:
the filled up string
- static fill_up_space_left(demand_of_chars, string, filling=' ')[source]¶
Add whitespaces to string to a length of demand_of_chars on the left side
- Parameters:
demand_of_chars (int) – how much chars does the string have to have?
string (str) – the string that has to get filled up with spaces
filling (str) – filling char (default: blank space)
- Returns:
the filled up string
- static fill_up_space_right(demand_of_chars, string, filling=' ')[source]¶
Add whitespaces to string to a length of demand_of_chars on the right side
- Parameters:
demand_of_chars (int) – how much chars does the string have to have?
string (str) – the string that has to get filled up with spaces
filling (str) – filling char (default: blank space)
- Returns:
the filled up string
- generate_signature(api_secret=None, data=None)[source]¶
Signe the request.
- Parameters:
api_secret
data
- Returns:
- get_all_receives_last_second()[source]¶
Get the number of all receives of the last second
- Returns:
int
- get_bybit_api_status()[source]¶
get_bybit_api_status() is obsolete and will be removed in future releases, please use get_used_weight() instead!
- Returns:
dict
- get_current_receiving_speed(stream_id)[source]¶
Get the receiving speed of the last second in Bytes
- Returns:
int
- get_current_receiving_speed_global()[source]¶
Get the receiving speed of the last second in Bytes from all streams!
- Returns:
int
- static get_date_of_timestamp(timestamp)[source]¶
Convert a timestamp into a readable date/time format for humans
- Parameters:
timestamp (timestamp) – provide the timestamp you want to convert into a date
- Returns:
str
- get_errors_from_endpoints()[source]¶
Get all the stored error messages from the ringbuffer sent by the endpoints.
- Returns:
list
- get_event_loop_by_stream_id(stream_id: str | bool | None = False) AbstractEventLoop | None [source]¶
Get the asyncio event loop used by a specific stream.
- Returns:
asyncio.AbstractEventLoop or None
- get_exchange()[source]¶
Get the name of the used exchange like “bybit.com” or “bybit.com-testnet”
- Returns:
str
- static get_human_bytesize(amount_bytes, suffix='')[source]¶
Convert the bytes to something readable
- Parameters:
amount_bytes (int) – amount of bytes
suffix (str) – add a string after
- Returns:
- static get_human_uptime(uptime)[source]¶
Convert a timespan of seconds into hours, days, …
- Parameters:
uptime (int) – Uptime in seconds
- Returns:
- get_keep_max_received_last_second_entries()[source]¶
Get the number of how much received_last_second entries are stored till they get deleted
- Returns:
int
- static get_latest_release_info()[source]¶
Get infos about the latest available release
- Returns:
dict or None
- static get_latest_release_info_check_command()[source]¶
Get infos about the latest available check_lucit_collector release
- Returns:
dict or None
- get_latest_version() str | None [source]¶
Get the version of the latest available release (cache time 1 hour)
- Returns:
str or None
- get_latest_version_check_command() str | None [source]¶
Get the version of the latest available check_lucit_collector.py release (cache time 1 hour)
- Returns:
str or None
- get_limit_of_subscriptions_per_stream()[source]¶
Get the number of allowed active subscriptions per stream (limit of Bybit API)
- Returns:
int
- get_most_receives_per_second()[source]¶
Get the highest total receives per second value
- Returns:
int
- static get_new_uuid_id() str [source]¶
Get a new unique uuid in string format. This is used as ‘stream_id’ or ‘socket_id’.
- Returns:
uuid (str)
- get_number_of_streams_in_stream_list()[source]¶
Get the number of streams that are stored in the stream_list
- Returns:
int
- get_number_of_subscriptions(stream_id)[source]¶
Get the number of subscriptions of a specific stream
- Returns:
int
- static get_process_usage_threads()[source]¶
Get the amount of threads that this process is using
- Returns:
int
- get_result_by_request_id(request_id=None, timeout=10)[source]¶
Get the result related to the provided request_id
- Parameters:
request_id (stream_id (uuid)) – if you run get_stream_subscriptions() it returns a unique request_id - provide it to this method to receive the result.
timeout (int) – seconds to wait to receive the result. If not there it returns ‘False’
- Returns:
result or None
- get_results_from_endpoints()[source]¶
Get all the stored result messages from the ringbuffer sent by the endpoints.
- Returns:
list
- get_ringbuffer_error_max_size()[source]¶
How many entries should be stored in the ringbuffer?
- Returns:
int
- get_ringbuffer_result_max_size()[source]¶
How many entries should be stored in the ringbuffer?
- Returns:
int
- get_start_time()[source]¶
Get the start_time of the BybitWebSocketApiManager instance
- Returns:
timestamp
- get_stream_buffer_byte_size()[source]¶
Get the current byte size estimation of the stream_buffer
- Returns:
int
- get_stream_buffer_length(stream_buffer_name: Literal[False] | str = False)[source]¶
Get the current number of items in all stream_buffer or of a specific stream_buffer
- Parameters:
stream_buffer_name (str or stream_id) – Name of the stream_buffer
- Returns:
int
- get_stream_buffer_maxlen(stream_buffer_name: Literal[False] | str = False)[source]¶
Get the maxlen value of the stream_buffer
If maxlen is not specified or is None, stream_buffer may grow to an arbitrary length. Otherwise, the stream_buffer is bounded to the specified maximum length. Once a bounded length stream_buffer is full, when new items are added, a corresponding number of items are discarded from the opposite end.
- Parameters:
stream_buffer_name (False or str) – False to read from generic stream_buffer, the stream_id if you used True in create_stream() or the string name of a shared stream_buffer.
- Returns:
int or False
- async get_stream_data_from_asyncio_queue(stream_id=None)[source]¶
Retrieves the oldest entry from the FIFO stack.
- Parameters:
stream_id (str) – provide a stream_id - only needed for userData Streams (acquiring a listenKey)
- Returns:
stream_data - str, dict or None
- get_stream_id_by_label(stream_label: str = None) str | None [source]¶
Get the stream_id of a specific stream by stream label
- Parameters:
stream_label (str) – stream_label of the stream you search
- Returns:
stream_id or None
- get_stream_info(stream_id)[source]¶
Get all infos about a specific stream
- Parameters:
stream_id (str) – id of a stream
- Returns:
set
- get_stream_label(stream_id=None)[source]¶
Get the stream_label of a specific stream
- Parameters:
stream_id (str) – id of a stream
- Returns:
str or None
- get_stream_receives_last_second(stream_id)[source]¶
Get the number of receives of specific stream from the last seconds
- Parameters:
stream_id (str) – id of a stream
- Returns:
int
- get_stream_statistic(stream_id)[source]¶
Get the statistic of a specific stream
- Parameters:
stream_id (str) – id of a stream
- Returns:
set
- get_stream_subscriptions(stream_id, request_id=None)[source]¶
Get a list of subscriptions of a specific stream from Bybit endpoints - the result can be received via the stream_buffer and is also added to the results ringbuffer - get_results_from_endpoints() to get all results or use get_result_by_request_id(request_id) to get a specific one!
This function is supported by CEX endpoints only!
Info: …
- Parameters:
stream_id (str) – id of a stream
request_id (int) – id to use for the request - use get_request_id() to create a unique id. If not provided or False, then this method is using get_request_id() automatically.
- Returns:
request_id (int)
- get_the_one_active_websocket_api() str | None [source]¶
This function is needed to simplify the access to the websocket API, if only one API stream exists it is clear that only this stream can be used for the requests and therefore will be used.
- Returns:
stream_id or None (str)
- get_used_weight()[source]¶
Get used_weight, last status_code and the timestamp of the last status update
- Returns:
dict
- get_user_agent()[source]¶
Get the user_agent string “lib name + lib version + python version”
- Returns:
- increase_processed_receives_statistic(stream_id)[source]¶
Add the number of processed receives
- Parameters:
stream_id (str) – id of a stream
- increase_received_bytes_per_second(stream_id, size)[source]¶
Add the amount of received bytes per second
- Parameters:
stream_id (str) – id of a stream
size (int) – amount of bytes to add
- increase_reconnect_counter(stream_id=None)[source]¶
Increase reconnect counter
- Parameters:
stream_id (str) – id of a stream
- increase_transmitted_counter(stream_id)[source]¶
Increase the counter of transmitted payloads :param stream_id: id of a stream :type stream_id: str
- is_crash_request(stream_id) bool [source]¶
Has a specific stream a crash_request?
- Parameters:
stream_id (str) – id of a stream
- Returns:
bool
- is_manager_stopping()[source]¶
Returns True if the manager has a stop request, ‘False’ if not.
- Returns:
bool
- is_socket_ready(stream_id: str = None) bool [source]¶
Set socket_is_ready for a specific stream to False.
- Parameters:
stream_id (str) – id of the stream
- is_stop_request(stream_id) bool [source]¶
Has a specific stream a stop_request?
- Parameters:
stream_id (str) – id of a stream
- Returns:
bool
- is_update_available_check_command(check_command_version=None)[source]¶
Is a new release of check_lucit_collector.py available?
- Returns:
bool
- static order_params(data)[source]¶
Convert params to list with signature as last element
- Parameters:
data
- Returns:
- pop_stream_data_from_stream_buffer(stream_buffer_name: Literal[False] | str = None, mode='FIFO')[source]¶
Get oldest or latest entry from stream_buffer and remove from FIFO/LIFO stack.
- Parameters:
stream_buffer_name (False or str) – False to read from generic stream_buffer, the stream_id if you used True in create_stream() or the string name of a shared stream_buffer.
mode (str) – How to read from the stream_buffer - “FIFO” (default) or “LIFO”.
- Returns:
stream_data - str, dict or None
- pop_stream_signal_from_stream_signal_buffer()[source]¶
Get the oldest entry from stream_signal_buffer and remove from stack/pipe (FIFO stack)
- Returns:
stream_signal - dict or False
- print_stream_info(stream_id: str = None, add_string: str = None, footer: str = None, title: str = None)[source]¶
Print all infos about a specific stream, helps debugging :)
- Parameters:
stream_id (str) – id of a stream
add_string (str) – text to add to the output
footer (str) – set a footer (last row) for print_summary output
title (str) – set to True to use curses instead of print()
- print_summary(add_string: str = None, disable_print: bool = False, footer: str = None, title: str = None)[source]¶
Print an overview of all streams
- Parameters:
add_string (str) – text to add to the output
disable_print (bool) – set to True to use curses instead of print()
footer (str) – set a footer (last row) for print_summary output
title (str) – set a title (first row) for print_summary output
- print_summary_to_png(print_summary_export_path, height_per_row=12.5, add_string: str = None, footer: str = None, title: str = None)[source]¶
Create a PNG image file with the console output of print_summary()
LINUX ONLY It should not be hard to make it OS independent: https://github.com/LUCIT-Systems-and-Development/unicorn-bybit-websocket-api/issues/61
- Parameters:
print_summary_export_path (str) – If you want to export the output of print_summary() to an image, please provide a path like “/var/www/html/”. View the Wiki!
height_per_row (float) – set the height per row for the image height calculation
add_string (str) – text to add to the output
footer (str) – set a footer (last row) for print_summary output
title (str) – set a title (first row) for print_summary output
- Returns:
bool
- remove_all_data_of_stream_id(stream_id, timeout: float = 10.0) bool [source]¶
Delete all remaining data within the UBBWA instance of a stopped stream.
Even if a stream crashes or get stopped, its data remains in the BybitWebSocketApiManager till you stop the BybitWebSocketApiManager itself. If you want to tidy up the entire UBBWA instance you can use this method.
UnicornBybitWebSocketApiManager accepts the parameter auto_data_cleanup_stopped_streams. If this is set to True (auto_data_cleanup_stopped_streams=True), the UBBWA instance performs the cleanup with this function remove_all_data_of_stream_id() automatically and regularly.
- Parameters:
stream_id (str) – id of a stream
timeout (float) – The timeout for how long to wait for the stream to stop. The function aborts if the waiting time is exceeded and returns False.
- Returns:
bool
- static remove_ansi_escape_codes(text)[source]¶
Remove ansi escape codes from the text string!
- Parameters:
text (str) – The text :)
- Returns:
str
- replace_stream(stream_id, new_channels, new_markets, new_stream_label=None, new_stream_buffer_name: Literal[False] | str = False, new_api_key=None, new_api_secret=None, new_symbols=None, new_output: Literal['dict', 'raw_data'] | None = None, new_ping_interval=20, new_ping_timeout=20, new_close_timeout=10, new_stream_buffer_maxlen=None)[source]¶
Replace a stream
If you want to start a stream with a new config, its recommended, to first start a new stream with the new settings and close the old stream not before the new stream received its first data. So your data will stay consistent.
- Parameters:
stream_id (str) – id of the old stream
new_channels (str, tuple, list, set) – the new channel list for the stream
new_markets (str, tuple, list, set) – the new markets list for the stream
new_stream_label (str) – provide a stream_label to identify the stream
new_stream_buffer_name (False or str) – If False the data is going to get written to the default stream_buffer, set to True to read the data via pop_stream_data_from_stream_buffer(stream_id) or provide a string to create and use a shared stream_buffer and read it via pop_stream_data_from_stream_buffer(‘string’).
new_api_key (str) – provide a valid Bybit API key
new_api_secret (str) – provide a valid Bybit API secret
new_symbols (str) – provide the symbols for isolated_margin user_data streams
new_output (str) – set to “dict” to convert the received raw data to a python dict - otherwise the output remains unchanged and gets delivered as received from the endpoints
new_ping_interval (int or None) – Once the connection is open, a Ping frame is sent every ping_interval seconds. This serves as a keepalive. It helps keeping the connection open, especially in the presence of proxies with short timeouts on inactive connections. Set ping_interval to None to disable this behavior. (default: 20) This parameter is passed through to the websockets.client.connect()
new_ping_timeout (int or None) – If the corresponding Pong frame isn’t received within ping_timeout seconds, the connection is considered unusable and is closed with code 1011. This ensures that the remote endpoint remains responsive. Set ping_timeout to None to disable this behavior. (default: 20) This parameter is passed through to the websockets.client.connect()
new_close_timeout (int or None) – The close_timeout parameter defines a maximum wait time in seconds for completing the closing handshake and terminating the TCP connection. (default: 10) This parameter is passed through to the websockets.client.connect()
new_stream_buffer_maxlen (int or None) – Set a max len for the stream_buffer. Only used in combination with a non-generic stream_buffer. The generic stream_buffer uses always the value of BybitWebSocketApiManager().
- Returns:
new stream_id
- Returns:
stream_id or ‘None’
- send_stream_signal(signal_type=None, stream_id=None, data_record=None, error_msg=None) bool [source]¶
Send a stream signal
- send_with_stream(stream_id: str = None, payload: dict | str = None, timeout: float = 5.0) bool [source]¶
Send a payload with a specific stream.
- Parameters:
stream_id (str) – id of the stream to be used for sending.
payload (dict or str(JSON)) – The payload to add.
timeout (float or int) – Timeout to wait for a ready stream.
- Returns:
bool
- set_heartbeat(stream_id) None [source]¶
Set heartbeat for a specific thread (should only be done by the stream itself)
- Returns:
None
- set_keep_max_received_last_second_entries(number_of_max_entries)[source]¶
Set how much received_last_second entries are stored till they get deleted!
- Parameters:
number_of_max_entries (int) – number of entries to keep in list
- set_ringbuffer_error_max_size(max_size)[source]¶
How many error messages should be kept in the ringbuffer?
- Parameters:
max_size (int) – Max entries of error messages in the ringbuffer.
- Returns:
bool
- set_ringbuffer_result_max_size(max_size)[source]¶
How many result messages should be kept in the ringbuffer?
- Parameters:
max_size (int) – Max entries of result messages in the ringbuffer.
- Returns:
bool
- set_socket_is_not_ready(stream_id: str) bool [source]¶
Set socket_is_ready for a specific stream to False.
- Parameters:
stream_id (str) – id of the stream
- Returns:
bool
- set_socket_is_ready(stream_id: str) bool [source]¶
Set socket_is_ready for a specific stream to True.
- Parameters:
stream_id (str) – id of the stream
- Returns:
bool
- set_stream_label(stream_id, stream_label=None) bool [source]¶
Set a stream_label by stream_id
- Parameters:
stream_id (str) – id of the stream
stream_label (str) – stream_label to set
- Returns:
bool
- split_payload(params, method, max_items_per_request=350)[source]¶
Sending more than 8000 chars via websocket.send() leads to a connection loss, 350 list elements is a good limit to keep the payload length under 8000 chars and avoid reconnects
- Parameters:
params (list) – params of subscribe payload
method (str) – SUBSCRIBE or UNSUBSCRIBE
max_items_per_request – max size for params, if more it gets split
- Returns:
list or False
- stop_manager(close_api_session: bool = True)[source]¶
Stop the BybitWebSocketApiManager with all streams, monitoring and management threads
- stop_manager_with_all_streams(close_api_session: bool = True)[source]¶
Stop the BybitWebSocketApiManager with all streams, monitoring and management threads
Alias of ‘stop_manager()’
- stop_stream(stream_id, delete_listen_key: bool = True)[source]¶
Stop a specific stream
- Parameters:
stream_id (str) – id of a stream
delete_listen_key (str) – If set to True (default), the listen_key gets deleted. Set to False if you run more than one userData stream with this listen_key!
- Returns:
bool
- subscribe_to_stream(stream_id: str = None, channels=None, markets=None) bool [source]¶
Subscribe channels and/or markets to an existing stream
If you provide one channel and one market, then every subscribed market is going to get added to the new channel and all subscribed channels are going to get added to the new market!
How are the parameter `channels and markets used with subscriptions
- Parameters:
stream_id (str) – id of a stream
channels (str, list, set) – provide the channels you wish to subscribe
markets (str, list, set) – provide the markets you wish to subscribe
- Returns:
bool
- unsubscribe_from_stream(stream_id: str = None, channels=None, markets=None) bool [source]¶
Unsubscribe channels and/or markets to an existing stream
If you provide one channel and one market, then all subscribed markets from the specific channel and all subscribed channels from the specific markets are going to be removed!
How are the parameter `channels and markets used with subscriptions
- Parameters:
stream_id (str) – id of a stream
channels (str, list, set) – provide the channels you wish to unsubscribe
markets (str, list, set) – provide the markets you wish to unsubscribe
- Returns:
bool
- wait_till_stream_has_started(stream_id, timeout: float = 0.0) bool [source]¶
Returns True as soon a specific stream has started and received its first stream data
- Parameters:
stream_id (str) – id of a stream
timeout (float) – The timeout for how long to wait for the stream to stop. The function aborts if the waiting time is exceeded and returns False.
- Returns:
bool
- wait_till_stream_has_stopped(stream_id: str = None, timeout: float = 0.0) bool [source]¶
Returns True as soon a specific stream has stopped itself
- Parameters:
stream_id (str) – id of a stream
timeout (float) – The timeout for how long to wait for the stream to stop. The function aborts if the waiting time is exceeded and returns False.
- Returns:
bool
unicorn_bybit_websocket_api.restclient module¶
- class unicorn_bybit_websocket_api.restclient.BybitWebSocketApiRestclient(debug: bool | None = False, disable_colorama: bool | None = False, exchange: str | None = 'bybit.com', lucit_api_secret: str | None = None, lucit_license_ini: str = None, lucit_license_profile: str | None = None, lucit_license_token: str | None = None, restful_base_uri: str | None = None, show_secrets_in_logs: bool | None = False, socks5_proxy_server: str | None = None, socks5_proxy_user: str | None = None, socks5_proxy_pass: str | None = None, socks5_proxy_ssl_verification: bool | None = True, stream_list: dict = None, warn_on_update: bool | None = True)[source]¶
Bases:
object