import io
import numpy as np
from IPython import embed
from minio import Minio
from minio.error import BucketAlreadyExists
from minio.error import BucketAlreadyOwnedByYou
from minio.error import ResponseError
ip = '127.0.0.1:50055'
access_key = 'admin'
secure_key = 'password'
if __name__ == '__main__':
data1 = np.random.random(1000)
client = Minio(ip,
access_key=access_key,
secret_key=secure_key,
secure=False)
# Make a bucket with the make_bucket API call.
try:
client.make_bucket("logs")
except BucketAlreadyOwnedByYou as err:
pass
except BucketAlreadyExists as err:
pass
except ResponseError as err:
raise
# Put an object 'pumaserver_debug.log' with contents from 'pumaserver_debug.log'.
try:
b_data = data1.tostring()
with io.BytesIO(b_data) as f:
client.put_object('logs',
'log-101',
f,
len(b_data))
except ResponseError as err:
print(err)
# Get a full object
try:
resp = client.get_object('logs', 'log-101')
data2 = np.frombuffer(resp.read())
print((data1 == data2).all())
except ResponseError as err:
print(err)